Workflow Engine in Erlang Текущий мой проект напрямую связан с моим магистерским дипломом. Сейчас я могу смело заявлять, что работаю по специальности. Тема моего магистрантского диплома на Факультете Прикладной Математики была "Архитектура и программирование систем управления бизнес-процессами". В этой работе был исследован комплекс математических, лингвистических (как сейчас модно говорить DSL) и других обеспечений, которые лежат в основе таких систем. В основе систем управления процессами лежат Сети Петри, переходы между состояниями FSM тоже отмечены вершинами. Этот математический аппарат очевидно предшествовал модели модели акторов. В работе были рассмотрены также другие модели организации бизнес процессов: Event-Condition-Action модели (или как сейчас модно говорить реакционной модели), алгоритм RETE и дедуктивные системы правил. Уже тогда мной были рассмотрены все существующие на тот момент Workflow DSL, такие как XPDL (WfMC), Skelta, OpenWFE, Shark, J2, jBPM. Создавая тогда свою систему на .NET 1.0 я объединил ECA и Workflow подходы. Но я тогда не знал Erlang. Сейчас я реализовал аналогичную по мощности систему в 300 строк Эрланг кода. За основу я взял современный BPMN 2.0 стандарт, который объединил все существующие системы на рынке через 8 лет после моей работы. Поскольку формат этого блога и размер этой системы мне позволяют, я приведу фактически весь код этого движка на Эрланге. 1. Core Модель BPMN 2.0 Здесь конечно не все, но здесь как раз эти 20% которые покрывают 80% workflows. -record(task, { name, roles=[], module }). -record(userTask, { name, roles=[], module }). -record(serviceTask, { name, roles=[], module }). -record(receiveTask, { name, roles=[], module }). -record(messageEvent, { name, payload=[], timeout=[], module }). -record(beginEvent , { name, module }). -record(endEvent, { name, module }). -record(sequenceFlow, { source, target }). -record(history, { ?ITERATOR(feed,true), name, task }). -record(process, { ?ITERATOR(feed,true), name, roles=[], tasks=[], events=[], history=[], flows=[], rules, docs=[], task, beginEvent, endEvent }). Здесь ?ITERATOR это макрос из KVS для персистентности процессов и их логов выполнения. 2. Портал в FSM aka boundaryEvent aka sync В каждом FSM есть желание триггернуть состояние напрямую. В erlang такое тоже есть и у нас тоже есть. Я много написал gen_server и gen_fsm. И решил не использовать gen_fsm вообще никогда. Это broken by design elrnag otp behaviour. process_event(Event, Proc) -> Targets = bpe_task:targets(Event#messageEvent.name,Proc), {Status,{Reason,Target},ProcState} = bpe_event:handle_event(Event,bpe_task:find_flow(Targets),Proc), NewProcState = ProcState#process{task = Target}, FlowReply = fix_reply({Status,{Reason,Target},NewProcState}), kvs:put(NewProcState), FlowReply. 3. Синхронный walking по графу: process_flow(Proc) -> Curr = Proc#process.task, Term = [], Task = bpe:task(Curr,Proc), Targets = bpe_task:targets(Curr,Proc), {Status,{Reason,Target},ProcState} = case {Targets,Proc#process.task} of {[],Term} -> bpe_task:already_finished(Proc); {[],Curr} -> bpe_task:handle_task(Task,Curr,Curr,Proc); {[],_} -> bpe_task:denied_flow(Curr,Proc); {List,_} -> bpe_task:handle_task(Task,Curr,bpe_task:find_flow(List),Proc) end, kvs:add(#history { id = kvs:next_id("history",1), feed_id = {history,ProcState#process.id}, name = ProcState#process.name, task = {task, Curr} }), NewProcState = ProcState#process{task = Target}, FlowReply = fix_reply({Status,{Reason,Target},NewProcState}), kvs:put(NewProcState), FlowReply. 4. gen_server API Все workflow процессы имеют свои неубиваемые erlang процессы. Которые после перезапуска поднимают из базы свое состояние. Мечта каждой workflow системы. Абсолютно бесплатная в Erlang. handle_call({get},_,Proc) -> { reply,Proc,Proc }; handle_call({start},_,Proc) -> process_flow(Proc); handle_call({complete},_,Proc) -> process_flow(Proc); handle_call({event,Event},_,Proc) -> process_event(Event,Proc); handle_call({amend,Form},_,Proc) -> process_flow(Proc#process{docs=[Form|Proc#process.docs]}); 5. Sample Process Ну и собственно пример как это видят прикладные прогрммисты: -module(sampleproc_process). -author('Maxim Sokhatsky'). -include_lib("bpe/include/bpe.hrl"). -compile(export_all). definition() -> #process { name = 'Create Deposit Account', flows = [ #sequenceFlow{source='Init', target='Payment'}, #sequenceFlow{source='Payment', target='Signatory'}, #sequenceFlow{source='Payment', target='Process'}, #sequenceFlow{source='Process', target='Final'}, #sequenceFlow{source='Signatory', target='Process'}, #sequenceFlow{source='Signatory', target='Final'} ], tasks = [ #userTask { name='Init', module = deposit }, #userTask { name='Signatory', module = deposit}, #serviceTask { name='Payment', module = deposit}, #serviceTask { name='Process', module = deposit}, #endEvent { name='Final'} ], beginEvent = 'Init', endEvent = 'Final', events = [ #messageEvent{name="PaymentReceived"} ] }. action({request,'Init'}, Proc) -> {reply,Proc}; action({request,'Payment'}, Proc) -> Payment = bpe:doc(#payment_notification{},Proc), case is_tuple(Payment) of true -> {reply,'Process',Proc}; false -> {reply,'Signatory',Proc} end; action({request,'Signatory'}, Proc) -> {reply,'Process',Proc}; action({request,'Process'}, Proc) -> Account = #user{id=Proc#process.id}, kvs:add(Account), {reply,Proc}; action({request,'Final'}, Proc) -> {reply,Proc}. Enterprise is fun again :-) ______________________________ [1]. М.Сохацкий. Архитектура и программирование систем управления бизнес-процессами. [2]. SpawnProc BPE Erlang Workflow Engine.